/**
 * Copyright (c) 2021 OceanBase
 * OceanBase CE is licensed under Mulan PubL v2.
 * You can use this software according to the terms and conditions of the Mulan PubL v2.
 * You may obtain a copy of Mulan PubL v2 at:
 *          http://license.coscl.org.cn/MulanPubL-2.0
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
 * See the Mulan PubL v2 for more details.
 */

#define USING_LOG_PREFIX SHARE
#include "ob_index_builder_util.h"

#include "share/ob_define.h"
#include "lib/container/ob_array_iterator.h"
#include "lib/container/ob_array.h"
#include "share/schema/ob_table_schema.h"
#include "share/schema/ob_multi_version_schema_service.h"
#include "share/partition_table/ob_replica_filter.h"
#include "share/ob_get_compat_mode.h"
#include "sql/resolver/ddl/ob_ddl_resolver.h"
#include "sql/resolver/ob_resolver_utils.h"
#include "sql/resolver/expr/ob_raw_expr_util.h"
#include "sql/resolver/expr/ob_raw_expr_printer.h"
namespace oceanbase {
using namespace common;
using namespace common::sqlclient;
using namespace obrpc;
using namespace share::schema;
using namespace sql;
namespace share {
int ObIndexBuilderUtil::add_column(const ObColumnSchemaV2* data_column, const bool is_index, const bool is_rowkey,
    const ObOrderType order_type, ObRowDesc& row_desc, ObTableSchema& table_schema, const bool is_hidden /* = false */)
{
  int ret = OB_SUCCESS;
  if (NULL == data_column) {
    ret = OB_INVALID_ARGUMENT;
    LOG_WARN("data_column is null", KP(data_column), K(ret));
  } else if (OB_INVALID_INDEX != row_desc.get_idx(data_column->get_table_id(), data_column->get_column_id())) {
    if (is_index) {
      ret = OB_ERR_COLUMN_DUPLICATE;
      const ObString& column_name = data_column->get_column_name_str();
      LOG_USER_ERROR(OB_ERR_COLUMN_DUPLICATE, column_name.length(), column_name.ptr());
    } else {
      // column alread exist, ignore it
    }
  } else if (OB_FAIL(row_desc.add_column_desc(data_column->get_table_id(), data_column->get_column_id()))) {
    LOG_WARN("add_column_desc failed",
        "table_id",
        data_column->get_table_id(),
        "column_id",
        data_column->get_column_id(),
        K(ret));
  } else {
    ObColumnSchemaV2 column = *data_column;
    column.set_table_id(OB_INVALID_ID);
    column.set_autoincrement(false);
    column.set_tbl_part_key_pos(0);
    column.set_prev_column_id(UINT64_MAX);
    column.set_next_column_id(UINT64_MAX);
    if (is_hidden) {
      column.set_is_hidden(is_hidden);
    }

    // Generated column of index table's value is generated by data table. Default value
    // (generated column expression) is not needed, and it introduce failure on global index
    // partition location lookup (refer columns not exist in index table).
    // Fulltext column's default value is needed, because we need to set GENERATED_CTXCAT_CASCADE_FLAG
    // flag by parsing the default value.
    if (column.is_generated_column() && !column.is_fulltext_column()) {
      if (column.is_virtual_generated_column()) {
        column.del_column_flag(VIRTUAL_GENERATED_COLUMN_FLAG);
      }
      if (column.is_stored_generated_column()) {
        column.del_column_flag(STORED_GENERATED_COLUMN_FLAG);
      }
      ObObj obj;
      obj.set_null();
      column.set_cur_default_value(obj);
      column.set_orig_default_value(obj);
    }

    if (is_rowkey) {
      column.set_rowkey_position(row_desc.get_column_num());
      column.set_order_in_rowkey(order_type);
    } else {
      column.set_rowkey_position(0);
    }

    if (is_index) {
      column.set_index_position(row_desc.get_column_num());
      column.set_order_in_rowkey(order_type);
    } else {
      column.set_index_position(0);
    }
    // index column are not auto increment
    column.set_autoincrement(false);

    if (OB_FAIL(table_schema.add_column(column))) {
      LOG_WARN("add_column failed", K(column), K(ret));
    }
  }
  return ret;
}

int ObIndexBuilderUtil::add_shadow_pks(
    const ObTableSchema& data_schema, ObRowDesc& row_desc, ObTableSchema& schema, bool check_data_schema /*=true*/)
{
  int ret = OB_SUCCESS;
  if (check_data_schema && !data_schema.is_valid()) {
    ret = OB_INVALID_ARGUMENT;
    LOG_WARN("invalid argument", K(data_schema), K(ret));
  } else {
    const bool is_index_column = false;
    const bool is_rowkey = true;
    const ObColumnSchemaV2* const_data_column = NULL;
    ObColumnSchemaV2 data_column;
    const ObRowkeyInfo& rowkey_info = data_schema.get_rowkey_info();
    char shadow_pk_name[OB_MAX_COLUMN_NAME_BUF_LENGTH];
    for (int64_t i = 0; OB_SUCC(ret) && i < rowkey_info.get_size(); ++i) {
      uint64_t column_id = OB_INVALID_ID;
      int n = snprintf(shadow_pk_name, OB_MAX_COLUMN_NAME_BUF_LENGTH, "shadow_pk_%ld", i);
      if (n < 0 || n > OB_MAX_COLUMN_NAME_LENGTH) {
        ret = OB_BUF_NOT_ENOUGH;
        LOG_ERROR(
            "failed to generate shadow_pk_name", K(i), K(n), "buffer size", OB_MAX_COLUMN_NAME_BUF_LENGTH, K(ret));
      } else if (OB_FAIL(rowkey_info.get_column_id(i, column_id))) {
        LOG_WARN("get_column_id failed", "index", i, K(ret));
      } else if (NULL == (const_data_column = (data_schema.get_column_schema(column_id)))) {
        ret = OB_ERR_BAD_FIELD_ERROR;
        LOG_WARN("get_column_schema failed", "table_id", data_schema.get_table_id(), K(column_id), K(ret));
      } else if (ob_is_text_tc(const_data_column->get_data_type())) {
        ret = OB_ERR_WRONG_KEY_COLUMN;
        LOG_WARN("Unexpected lob column in shadow pk", "table_id", data_schema.get_table_id(), K(column_id), K(ret));
      } else {
        data_column = *const_data_column;
        data_column.set_nullable(true);
        data_column.set_tbl_part_key_pos(0);
        data_column.set_column_id(OB_MIN_SHADOW_COLUMN_ID + const_data_column->get_column_id());
        data_column.set_is_hidden(true);
        if (OB_FAIL(data_column.set_column_name(shadow_pk_name))) {
          LOG_WARN("set_column_name failed", K(shadow_pk_name), K(ret));
        } else {
          // primary key(uk2, uk1)
          if (data_column.get_column_id() > schema.get_max_used_column_id()) {
            schema.set_max_used_column_id(data_column.get_column_id());
          }

          if (OB_FAIL(add_column(
                  &data_column, is_index_column, is_rowkey, data_column.get_order_in_rowkey(), row_desc, schema))) {
            LOG_WARN("add column failed",
                "data_column",
                data_column,
                K(is_index_column),
                "order_in_rowkey",
                data_column.get_order_in_rowkey(),
                K(row_desc),
                K(ret));
          }
        }
      }
    }
  }
  return ret;
}

int ObIndexBuilderUtil::set_index_table_columns(const ObCreateIndexArg& arg, const ObTableSchema& data_schema,
    ObTableSchema& index_schema, bool check_data_schema /*=true*/)
{
  int ret = OB_SUCCESS;
  if (check_data_schema && !data_schema.is_valid()) {
    // some items in arg may be invalid, don't check arg
    ret = OB_INVALID_ARGUMENT;
    LOG_WARN("invalid argument", K(data_schema), K(ret));
  } else {
    ObRowDesc row_desc;
    bool is_index_column = false;
    // index columns
    // attention: must add index column first
    for (int64_t i = 0; OB_SUCC(ret) && i < arg.index_columns_.count(); ++i) {
      const ObColumnSchemaV2* data_column = NULL;
      const bool is_rowkey = true;
      is_index_column = true;
      const ObColumnSortItem& sort_item = arg.index_columns_.at(i);
      if (NULL == (data_column = data_schema.get_column_schema(sort_item.column_name_))) {
        ret = OB_ERR_KEY_COLUMN_DOES_NOT_EXITS;
        LOG_USER_ERROR(OB_ERR_KEY_COLUMN_DOES_NOT_EXITS, sort_item.column_name_.length(), sort_item.column_name_.ptr());
        LOG_WARN("get_column_schema failed",
            "tenant_id",
            data_schema.get_tenant_id(),
            "database_id",
            data_schema.get_database_id(),
            "table_name",
            data_schema.get_table_name(),
            "column name",
            sort_item.column_name_,
            K(ret));
      } else if (ob_is_text_tc(data_column->get_data_type())) {
        ret = OB_ERR_WRONG_KEY_COLUMN;
        LOG_USER_ERROR(OB_ERR_WRONG_KEY_COLUMN, sort_item.column_name_.length(), sort_item.column_name_.ptr());
        LOG_WARN("Index column should not be lob type",
            "tenant_id",
            data_schema.get_tenant_id(),
            "database_id",
            data_schema.get_database_id(),
            "table_name",
            data_schema.get_table_name(),
            "column name",
            sort_item.column_name_,
            "column length",
            sort_item.prefix_len_,
            K(ret));
      } else if (OB_FAIL(add_column(data_column,
                     is_index_column,
                     is_rowkey,
                     arg.index_columns_.at(i).order_type_,
                     row_desc,
                     index_schema))) {
        LOG_WARN("add column failed",
            "data_column",
            *data_column,
            K(is_index_column),
            K(is_rowkey),
            "rowkey_order_type",
            arg.index_columns_.at(i).order_type_,
            K(row_desc),
            K(ret));
      }
    }
    if (OB_SUCC(ret)) {
      index_schema.set_index_column_num(row_desc.get_column_num());
      index_schema.set_rowkey_column_num(row_desc.get_column_num());
    }

    // - if not unique index, add data table's rowkey to index table's rowkey
    // - if unique index, add data schema's rowkey to redundant storing columns
    // For unique index, another hidden column would append to rowkey, which
    // before data schema's rowkey.

    // add hidden ghost_pk columns for unique index
    const bool is_unique = index_schema.is_unique_index();
    if (OB_SUCC(ret) && is_unique) {
      if (OB_FAIL(add_shadow_pks(data_schema, row_desc, index_schema, check_data_schema))) {
        LOG_WARN("add_shadow_pks failed", K(data_schema), K(row_desc), K(ret));
      }
    }

    if (OB_SUCC(ret)) {
      is_index_column = false;
      const ObColumnSchemaV2* data_column = NULL;
      const ObRowkeyInfo& rowkey_info = data_schema.get_rowkey_info();
      for (int64_t i = 0; OB_SUCC(ret) && i < rowkey_info.get_size(); ++i) {
        uint64_t column_id = OB_INVALID_ID;
        const bool is_rowkey = !index_schema.is_unique_index();
        if (OB_FAIL(rowkey_info.get_column_id(i, column_id))) {
          LOG_WARN("get_column_id failed", "index", i, K(ret));
        } else if (NULL == (data_column = data_schema.get_column_schema(column_id))) {
          ret = OB_ERR_BAD_FIELD_ERROR;
          LOG_WARN("get_column_schema failed", "table_id", data_schema.get_table_id(), K(column_id), K(ret));
        } else if (ob_is_text_tc(data_column->get_data_type())) {
          ret = OB_ERR_WRONG_KEY_COLUMN;
          LOG_WARN("Lob column should not appear in rowkey position",
              "data_column",
              *data_column,
              K(is_index_column),
              K(is_rowkey),
              "order_in_rowkey",
              data_column->get_order_in_rowkey(),
              K(row_desc),
              K(ret));
        } else if (OB_FAIL(add_column(data_column,
                       is_index_column,
                       is_rowkey,
                       data_column->get_order_in_rowkey(),
                       row_desc,
                       index_schema))) {
          LOG_WARN("add column failed",
              "data_column",
              *data_column,
              K(is_index_column),
              K(is_rowkey),
              "order_in_rowkey",
              data_column->get_order_in_rowkey(),
              K(row_desc),
              K(ret));
        }
      }
    }

    // if not unique index, update rowkey_column_num
    if (OB_SUCCESS == ret && !index_schema.is_unique_index()) {
      index_schema.set_rowkey_column_num(row_desc.get_column_num());
    }

    // add redundant storing columns
    if (OB_SUCC(ret)) {
      for (int64_t i = 0; OB_SUCC(ret) && i < arg.store_columns_.count(); ++i) {
        const ObColumnSchemaV2* data_column = NULL;
        const bool is_rowkey = false;
        // is_rowkey is false, order_in_rowkey will not be used
        const ObOrderType order_in_rowkey = ObOrderType::DESC;
        if (NULL == (data_column = data_schema.get_column_schema(arg.store_columns_.at(i)))) {
          ret = OB_ERR_BAD_FIELD_ERROR;
          LOG_WARN("get_column_schema failed",
              "tenant_id",
              data_schema.get_tenant_id(),
              "database_id",
              data_schema.get_database_id(),
              "table_name",
              data_schema.get_table_name(),
              "column name",
              arg.store_columns_.at(i),
              K(ret));
        } else if (ob_is_text_tc(data_column->get_data_type())) {
          ret = OB_ERR_WRONG_KEY_COLUMN;
          LOG_USER_ERROR(OB_ERR_WRONG_KEY_COLUMN, arg.store_columns_.at(i).length(), arg.store_columns_.at(i).ptr());
          LOG_WARN("Index storing column should not be lob type",
              "tenant_id",
              data_schema.get_tenant_id(),
              "database_id",
              data_schema.get_database_id(),
              "table_name",
              data_schema.get_table_name(),
              "column name",
              arg.store_columns_.at(i),
              K(ret));
        } else if (OB_FAIL(
                       add_column(data_column, is_index_column, is_rowkey, order_in_rowkey, row_desc, index_schema))) {
          LOG_WARN("add_column failed",
              "data_column",
              *data_column,
              K(is_index_column),
              K(is_rowkey),
              K(order_in_rowkey),
              K(row_desc),
              K(ret));
        }
      }
    }

    // add redundant hidden storing columns
    if (OB_SUCC(ret)) {
      for (int64_t i = 0; OB_SUCC(ret) && i < arg.hidden_store_columns_.count(); ++i) {
        const ObColumnSchemaV2* data_column = NULL;
        const bool is_rowkey = false;
        // is_rowkey is false, order_in_rowkey will not be used
        const ObOrderType order_in_rowkey = ObOrderType::DESC;
        if (OB_ISNULL(data_column = data_schema.get_column_schema(arg.hidden_store_columns_.at(i)))) {
          ret = OB_ERR_BAD_FIELD_ERROR;
          LOG_WARN("get_column_schema failed",
              "tenant_id",
              data_schema.get_tenant_id(),
              "database_id",
              data_schema.get_database_id(),
              "table_name",
              data_schema.get_table_name(),
              "column name",
              arg.hidden_store_columns_.at(i),
              K(ret));
        } else if (ob_is_text_tc(data_column->get_data_type())) {
          ret = OB_ERR_WRONG_KEY_COLUMN;
          LOG_USER_ERROR(
              OB_ERR_WRONG_KEY_COLUMN, arg.hidden_store_columns_.at(i).length(), arg.hidden_store_columns_.at(i).ptr());
          LOG_WARN("Index storing column should not be lob type",
              "tenant_id",
              data_schema.get_tenant_id(),
              "database_id",
              data_schema.get_database_id(),
              "table_name",
              data_schema.get_table_name(),
              "column name",
              arg.hidden_store_columns_.at(i),
              K(ret));
        } else if (OB_FAIL(add_column(
                       data_column, is_index_column, is_rowkey, order_in_rowkey, row_desc, index_schema, true))) {
          LOG_WARN("add_column failed",
              "data_column",
              *data_column,
              K(is_index_column),
              K(is_rowkey),
              K(order_in_rowkey),
              K(row_desc),
              K(ret));
        }
      }
    }
  }
  return ret;
}

int ObIndexBuilderUtil::adjust_expr_index_args(
    ObCreateIndexArg& arg, ObTableSchema& data_schema, ObIArray<ObColumnSchemaV2*>& gen_columns)
{
  int ret = OB_SUCCESS;
  if (arg.fulltext_columns_.count() > 0) {
    ObColumnSchemaV2* ft_col = NULL;
    if (OB_FAIL(adjust_fulltext_args(arg, data_schema, ft_col))) {
      LOG_WARN("adjust fulltext args failed", K(ret));
    } else if (ft_col != NULL) {
      if (OB_FAIL(gen_columns.push_back(ft_col))) {
        LOG_WARN("store fulltext column failed", K(ret));
      }
    }
  } else if (OB_FAIL(adjust_ordinary_index_column_args(arg, data_schema, gen_columns))) {
    LOG_WARN("adjust ordinary index column args failed", K(ret));
  }
  return ret;
}

int ObIndexBuilderUtil::adjust_fulltext_columns(ObCreateIndexArg& arg, OrderFTColumns& ft_columns)
{
  int ret = OB_SUCCESS;
  ObIArray<ObString>& fulltext_columns = arg.fulltext_columns_;
  ObIArray<ObColumnSortItem>& sort_items = arg.index_columns_;
  for (int64_t i = 0; OB_SUCC(ret) && i < fulltext_columns.count(); ++i) {
    const ObString& ft_name = fulltext_columns.at(i);
    bool found_ft = false;
    for (int64_t j = 0; OB_SUCC(ret) && !found_ft && j < sort_items.count(); ++j) {
      const ObColumnSortItem& sort_item = sort_items.at(j);
      if (ObCharset::case_insensitive_equal(sort_item.column_name_, ft_name)) {
        found_ft = true;
        ret = ft_columns.push_back(std::pair<int64_t, ObString>(j, ft_name));
      }
    }
    if (!found_ft) {
      ret = OB_ERR_UNEXPECTED;
      LOG_WARN("fulltext column not exists in index", K(ft_name));
    }
  }
  if (OB_SUCC(ret)) {
    std::sort(ft_columns.begin(), ft_columns.end(), FulltextColumnOrder());
  }
  for (int64_t i = 1; OB_SUCC(ret) && i < ft_columns.count(); ++i) {
    if (ft_columns.at(i).first - ft_columns.at(i - 1).first != 1) {
      ret = OB_ERR_BAD_CTXCAT_COLUMN;
      LOG_USER_ERROR(OB_ERR_BAD_CTXCAT_COLUMN);
    }
  }
  return ret;
}

int ObIndexBuilderUtil::adjust_fulltext_args(
    ObCreateIndexArg& arg, ObTableSchema& data_schema, ObColumnSchemaV2*& ft_col)
{
  ft_col = NULL;
  int ret = OB_SUCCESS;
  ObIArray<ObString>& fulltext_columns = arg.fulltext_columns_;
  ObIArray<ObColumnSortItem>& sort_items = arg.index_columns_;
  ObArray<ObColumnSortItem> new_sort_items;
  uint64_t virtual_column_id = OB_INVALID_ID;
  if (fulltext_columns.count() > 0) {
    OrderFTColumns order_ft_columns;
    int64_t ft_begin_index = 0;  // first column idx in ft index
    int64_t ft_end_index = 0;    // last column idx in ft index
    if (sort_items.count() <= 0) {
      ret = OB_ERR_UNEXPECTED;
      LOG_WARN("sort items is empty", K(ret));
    } else if (OB_INVALID_ID != sort_items.at(0).column_id_) {
      // for restore purpose
      virtual_column_id = sort_items.at(0).column_id_;
      sort_items.at(0).column_id_ = OB_INVALID_ID;
    }
    if (OB_SUCC(ret)) {
      if (OB_FAIL(adjust_fulltext_columns(arg, order_ft_columns))) {
        LOG_WARN("adjust fulltext columns to order fulltext columns failed", K(ret));
      } else if (order_ft_columns.empty()) {
        ret = OB_ERR_UNEXPECTED;
        LOG_WARN("order fulltext columns is empty");
      } else {
        ft_begin_index = order_ft_columns.at(0).first;
        ft_end_index = order_ft_columns.at(order_ft_columns.count() - 1).first;
      }
    }
    for (int64_t i = 0; OB_SUCC(ret) && i < ft_begin_index; ++i) {
      ret = new_sort_items.push_back(sort_items.at(i));
    }
    if (OB_SUCC(ret)) {
      ObColumnSortItem ft_sort_item;
      int64_t old_cnt = data_schema.get_column_count();
      ObColumnSchemaV2* tmp_ft_col = NULL;
      if (OB_FAIL(generate_fulltext_column(order_ft_columns, data_schema, virtual_column_id, tmp_ft_col))) {
        LOG_WARN("generate fulltext column failed", K(ret));
      } else if (OB_ISNULL(tmp_ft_col)) {
        LOG_WARN("fulltext column schema is null", K(ret));
      } else if (FALSE_IT(ft_sort_item.column_name_ = tmp_ft_col->get_column_name_str())) {
      } else if (OB_FAIL(new_sort_items.push_back(ft_sort_item))) {
        LOG_WARN("store new sort items failed", K(ret));
      } else if (data_schema.get_column_count() > old_cnt) {
        ft_col = tmp_ft_col;
      }
    }
    for (int64_t i = ft_end_index + 1; OB_SUCC(ret) && i < sort_items.count(); ++i) {
      ret = new_sort_items.push_back(sort_items.at(i));
    }
    if (OB_SUCC(ret)) {
      sort_items.reset();
      fulltext_columns.reset();
      ret = sort_items.assign(new_sort_items);
    }
  }
  return ret;
}

int ObIndexBuilderUtil::adjust_ordinary_index_column_args(
    ObCreateIndexArg& arg, ObTableSchema& data_schema, ObIArray<ObColumnSchemaV2*>& gen_columns)
{
  int ret = OB_SUCCESS;
  ObIArray<ObColumnSortItem>& sort_items = arg.index_columns_;
  ObArray<ObColumnSortItem> new_sort_items;
  ObWorker::CompatMode compat_mode = ObWorker::CompatMode::MYSQL;
  uint64_t tenant_id = extract_tenant_id(data_schema.get_table_id());
  ObCompatModeGetter::get_tenant_mode(tenant_id, compat_mode);
  CompatModeGuard compat_guard(compat_mode);
  for (int64_t i = 0; OB_SUCC(ret) && i < sort_items.count(); ++i) {
    int64_t old_cnt = data_schema.get_column_count();
    ObColumnSortItem new_sort_item = sort_items.at(i);
    ObColumnSchemaV2* gen_col = NULL;
    if (new_sort_item.prefix_len_ > 0) {
      // handle prefix column index
      if (OB_FAIL(generate_prefix_column(new_sort_item, data_schema, gen_col))) {
        LOG_WARN("generate prefix column failed", K(ret));
      } else {
        new_sort_item.column_name_ = gen_col->get_column_name_str();
        new_sort_item.prefix_len_ = 0;
      }
    } else if (share::is_mysql_mode() || (share::is_oracle_mode() && !new_sort_item.is_func_index_)) {
      // for mysql mode, oracle mode with non-function index
      const ObColumnSchemaV2* col_schema = data_schema.get_column_schema(new_sort_item.column_name_);
      if (OB_ISNULL(col_schema)) {
        ret = OB_ERR_KEY_COLUMN_DOES_NOT_EXITS;
        LOG_USER_ERROR(
            OB_ERR_KEY_COLUMN_DOES_NOT_EXITS, new_sort_item.column_name_.length(), new_sort_item.column_name_.ptr());
      }
    } else if (share::is_mysql_mode()) {
      const ObColumnSchemaV2* col_schema = data_schema.get_column_schema(new_sort_item.column_name_);
      if (OB_ISNULL(col_schema)) {
        ret = OB_ERR_KEY_COLUMN_DOES_NOT_EXITS;
        LOG_USER_ERROR(
            OB_ERR_KEY_COLUMN_DOES_NOT_EXITS, new_sort_item.column_name_.length(), new_sort_item.column_name_.ptr());
      }
    } else {
      // parse ordinary index expr as the real expr(maybe column expression)
      const ObString& index_expr_def = new_sort_item.column_name_;
      ObArenaAllocator allocator(ObModIds::OB_SQL_EXPR);
      ObRawExprFactory expr_factory(allocator);
      ObSQLSessionInfo session;
      ObRawExpr* expr = NULL;
      if (OB_FAIL(session.init(
              0 /*default session version*/, 0 /*default session id*/, 0 /*default proxy id*/, &allocator))) {
        LOG_WARN("init session failed", K(ret));
      } else if (OB_FAIL(session.load_default_sys_variable(false, false))) {
        LOG_WARN("session load default system variable failed", K(ret));
      } else if (OB_FAIL(ObRawExprUtils::build_generated_column_expr(
                     index_expr_def, expr_factory, session, data_schema, expr))) {
        LOG_WARN("build generated column expr failed", K(ret));
      } else if (!expr->is_deterministic()) {
        ret = OB_ERR_ONLY_PURE_FUNC_CANBE_INDEXED;
        LOG_WARN("only pure functions can be indexed", K(ret));
      } else if (!expr->is_column_ref_expr()) {
        // real index expr, so generate hidden generated column in data table schema
        if (OB_FAIL(generate_ordinary_generated_column(*expr, data_schema, gen_col))) {
          LOG_WARN("generate ordinary generated column failed", K(ret));
        } else {
          new_sort_item.column_name_ = gen_col->get_column_name_str();
          new_sort_item.is_func_index_ = false;
        }
      } else {
        const ObColumnRefRawExpr* ref_expr = static_cast<const ObColumnRefRawExpr*>(expr);
        new_sort_item.column_name_ = ref_expr->get_column_name();
        new_sort_item.is_func_index_ = false;
      }
    }
    if (OB_SUCC(ret)) {
      if (OB_FAIL(new_sort_items.push_back(new_sort_item))) {
        LOG_WARN("store new sort item failed", K(ret), K(new_sort_item));
      } else if (data_schema.get_column_count() > old_cnt) {
        LOG_INFO("column info", KPC(gen_col), K(old_cnt), K(data_schema.get_column_count()));
        if (OB_FAIL(gen_columns.push_back(gen_col))) {
          LOG_WARN("store generated column failed", K(ret));
        }
      }
    }
  }
  if (OB_SUCC(ret)) {
    sort_items.reset();
    if (OB_FAIL(sort_items.assign(new_sort_items))) {
      LOG_WARN("assign sort items failed", K(ret));
    }
  }
  return ret;
}

int ObIndexBuilderUtil::generate_fulltext_column(
    OrderFTColumns& ft_cols, ObTableSchema& data_schema, uint64_t specified_virtual_cid, ObColumnSchemaV2*& ft_col)
{
  ft_col = NULL;
  int ret = OB_SUCCESS;
  ObColumnSchemaV2* col_schema = NULL;
  ObColumnSchemaV2 column_schema;
  char col_name_buf[OB_MAX_COLUMN_NAMES_LENGTH] = {'\0'};
  SMART_VAR(char[OB_MAX_DEFAULT_VALUE_LENGTH], ft_expr_def)
  {
    MEMSET(ft_expr_def, 0, sizeof(ft_expr_def));
    int64_t name_pos = 0;
    int64_t def_pos = 0;
    int32_t max_data_length = 0;
    ObCollationType collation_type = CS_TYPE_INVALID;
    if (OB_FAIL(databuff_printf(col_name_buf, OB_MAX_COLUMN_NAMES_LENGTH, name_pos, "__word_segment"))) {
      LOG_WARN("print generate column prefix name failed", K(ret));
    } else if (OB_FAIL(databuff_printf(ft_expr_def, OB_MAX_DEFAULT_VALUE_LENGTH, def_pos, "WORD_SEGMENT("))) {
      LOG_WARN("print generate expr definition prefix failed", K(ret));
    }
    for (int64_t i = 0; OB_SUCC(ret) && i < ft_cols.count(); ++i) {
      const ObString& column_name = ft_cols.at(i).second;
      if (OB_ISNULL(col_schema = data_schema.get_column_schema(column_name))) {
        ret = OB_ERR_KEY_COLUMN_DOES_NOT_EXITS;
        LOG_USER_ERROR(OB_ERR_KEY_COLUMN_DOES_NOT_EXITS, column_name.length(), column_name.ptr());
      } else if (!col_schema->is_string_type() || col_schema->get_meta_type().is_blob()) {
        ret = OB_ERR_BAD_FT_COLUMN;
        LOG_USER_ERROR(OB_ERR_BAD_FT_COLUMN, column_name.length(), column_name.ptr());
      } else if (col_schema->is_generated_column()) {
        ret = OB_NOT_SUPPORTED;
        LOG_USER_ERROR(OB_NOT_SUPPORTED, "CTXCAT on generated column");
      } else if (OB_FAIL(databuff_printf(
                     col_name_buf, OB_MAX_COLUMN_NAMES_LENGTH, name_pos, "_%ld", col_schema->get_column_id()))) {
        LOG_WARN("print column id to buffer failed", K(ret), K(col_schema->get_column_id()));
      } else if (OB_FAIL(databuff_printf(
                     ft_expr_def, OB_MAX_DEFAULT_VALUE_LENGTH, def_pos, "`%s`, ", col_schema->get_column_name()))) {
        LOG_WARN("print column name to buffer failed", K(ret));
      } else if (OB_FAIL(column_schema.add_cascaded_column_id(col_schema->get_column_id()))) {
        LOG_WARN("add cascaded column to generated column failed", K(ret));
      } else {
        col_schema->add_column_flag(GENERATED_DEPS_CASCADE_FLAG);
        if (max_data_length < col_schema->get_data_length()) {
          max_data_length = col_schema->get_data_length();
        }
        if (CS_TYPE_INVALID == collation_type) {
          collation_type = col_schema->get_collation_type();
        } else if (collation_type != col_schema->get_collation_type()) {
          ret = OB_NOT_SUPPORTED;
          LOG_USER_ERROR(OB_NOT_SUPPORTED, "create fulltext index on columns with different collation");
        } else { /*do nothing*/
        }
      }
    }
    if (OB_SUCC(ret)) {
      def_pos -= 2;  // remove last ", "
      if (OB_FAIL(databuff_printf(ft_expr_def, OB_MAX_DEFAULT_VALUE_LENGTH, def_pos, ")"))) {
        LOG_WARN("print generate expr definition suffix failed", K(ret));
      }
    }
    if (OB_SUCC(ret)) {
      // another fulltext index could have created the generated column
      ft_col = data_schema.get_column_schema(col_name_buf);
      if (OB_INVALID_ID != specified_virtual_cid) {
        if (OB_NOT_NULL(ft_col)) {
          // check the specified column id is consistent with the existed column schema
          if (specified_virtual_cid != ft_col->get_column_id()) {
            ret = OB_ERR_INVALID_COLUMN_ID;
            LOG_USER_ERROR(OB_ERR_INVALID_COLUMN_ID, static_cast<int>(name_pos), col_name_buf);
            LOG_WARN("Column id specified by create fulltext index mismatch with column schema id",
                K(ret),
                K(specified_virtual_cid),
                K(*ft_col));
          }
        } else if (OB_NOT_NULL(data_schema.get_column_schema(specified_virtual_cid))) {
          // check the specified column id is not used by others
          ret = OB_ERR_INVALID_COLUMN_ID;
          LOG_USER_ERROR(OB_ERR_INVALID_COLUMN_ID, static_cast<int>(name_pos), col_name_buf);
          LOG_WARN("Column id specified by create fulltext index has been used", K(ret), K(specified_virtual_cid));
        }
      }
      if (OB_FAIL(ret)) {
        // do nothing
      } else if (OB_NOT_NULL(ft_col)) {
        // the generated colum is created
        if (OB_UNLIKELY(!ft_col->has_column_flag(GENERATED_CTXCAT_CASCADE_FLAG))) {
          ret = OB_ERR_COLUMN_DUPLICATE;
          LOG_USER_ERROR(OB_ERR_COLUMN_DUPLICATE, static_cast<int>(name_pos), col_name_buf);
          LOG_WARN("Generate column name has been used", K(ret), K(*ft_col));
        }
      } else {
        // the generated column is not created
        ObObj default_value;
        default_value.set_varchar(ft_expr_def, static_cast<int32_t>(def_pos));
        column_schema.set_rowkey_position(0);
        column_schema.set_index_position(0);
        column_schema.set_tbl_part_key_pos(0);
        column_schema.set_tenant_id(data_schema.get_tenant_id());
        column_schema.set_table_id(data_schema.get_table_id());
        column_schema.set_column_id(
            OB_INVALID_ID == specified_virtual_cid ? data_schema.get_max_used_column_id() + 1 : specified_virtual_cid);
        column_schema.add_column_flag(GENERATED_CTXCAT_CASCADE_FLAG);
        column_schema.add_column_flag(VIRTUAL_GENERATED_COLUMN_FLAG);
        column_schema.set_is_hidden(true);
        column_schema.set_data_type(ObVarcharType);
        column_schema.set_data_length(max_data_length);
        column_schema.set_collation_type(collation_type);
        column_schema.set_prev_column_id(UINT64_MAX);
        column_schema.set_next_column_id(UINT64_MAX);
        if (OB_FAIL(column_schema.set_column_name(col_name_buf))) {
          LOG_WARN("set column name failed", K(ret));
        } else if (OB_FAIL(column_schema.set_orig_default_value(default_value))) {
          LOG_WARN("set orig default value failed", K(ret));
        } else if (OB_FAIL(column_schema.set_cur_default_value(default_value))) {
          LOG_WARN("set current default value failed", K(ret));
        } else if (OB_FAIL(data_schema.add_column(column_schema))) {
          LOG_WARN("add column schema to data table failed", K(ret));
        } else {
          ft_col = data_schema.get_column_schema(column_schema.get_column_id());
        }
      }
    }
  }
  return ret;
}

int ObIndexBuilderUtil::generate_ordinary_generated_column(
    ObRawExpr& expr, ObTableSchema& data_schema, ObColumnSchemaV2*& gen_col, const uint64_t index_id)
{
  int ret = OB_SUCCESS;
  ObColumnSchemaV2 tmp_gen_col;
  SMART_VAR(char[OB_MAX_DEFAULT_VALUE_LENGTH], expr_def_buf)
  {
    MEMSET(expr_def_buf, 0, sizeof(expr_def_buf));
    int64_t pos = 0;
    ObRawExprPrinter expr_printer(expr_def_buf, OB_MAX_DEFAULT_VALUE_LENGTH, &pos);
    const bool is_invalid = (index_id < OB_APP_MIN_COLUMN_ID || index_id > OB_MIN_SHADOW_COLUMN_ID);
    if (OB_FAIL(expr_printer.do_print(&expr, T_NONE_SCOPE, true))) {
      LOG_WARN("print expr definition failed", K(ret));
    } else {
      ObString expr_def(pos, expr_def_buf);
      ObColumnSchemaV2* old_gen_col = NULL;
      if (OB_FAIL(data_schema.get_generated_column_by_define(expr_def, true /*only hidden column*/, old_gen_col))) {
        LOG_WARN("get generated column by define failed", K(ret), K(expr_def));
      } else if (old_gen_col != NULL) {
        // got it
        gen_col = old_gen_col;
      } else {
        // need to add new generated column
        ObObj default_value;
        char col_name_buf[OB_MAX_COLUMN_NAMES_LENGTH] = {'\0'};
        pos = 0;
        default_value.set_varchar(expr_def);
        tmp_gen_col.set_rowkey_position(0);
        tmp_gen_col.set_index_position(0);
        tmp_gen_col.set_tbl_part_key_pos(0);
        tmp_gen_col.set_tenant_id(data_schema.get_tenant_id());
        tmp_gen_col.set_table_id(data_schema.get_table_id());
        tmp_gen_col.set_column_id(data_schema.get_max_used_column_id() + 1);
        tmp_gen_col.add_column_flag(VIRTUAL_GENERATED_COLUMN_FLAG);
        tmp_gen_col.set_is_hidden(true);
        tmp_gen_col.set_data_type(expr.get_data_type());
        tmp_gen_col.set_collation_type(expr.get_collation_type());
        tmp_gen_col.set_accuracy(expr.get_accuracy());
        tmp_gen_col.set_prev_column_id(UINT64_MAX);
        tmp_gen_col.set_next_column_id(UINT64_MAX);
        ObSEArray<ObRawExpr*, 4> dep_columns;
        if (OB_FAIL(ObRawExprUtils::extract_column_exprs(&expr, dep_columns))) {
          LOG_WARN("extract column exprs failed", K(ret), K(expr));
        }
        for (int64_t i = 0; OB_SUCC(ret) && i < dep_columns.count(); ++i) {
          const ObRawExpr* dep_column = dep_columns.at(i);
          if (OB_ISNULL(dep_column)) {
            ret = OB_ERR_UNEXPECTED;
            LOG_WARN("deps_column is null");
          } else if (!dep_column->is_column_ref_expr()) {
            ret = OB_ERR_UNEXPECTED;
            LOG_WARN("dep column is invalid", K(ret), KPC(dep_column));
          } else if (OB_FAIL(tmp_gen_col.add_cascaded_column_id(
                         static_cast<const ObColumnRefRawExpr*>(dep_column)->get_column_id()))) {
            LOG_WARN("add cascaded column id failed", K(ret));
          }
        }

        if (OB_SUCC(ret) && OB_INVALID_ID != index_id) {
          if (index_id != gen_col->get_column_id()) {
            ret = OB_ERR_INVALID_COLUMN_ID;
            LOG_WARN("column id specified by create index mismatch with column id", K(ret), K(gen_col), K(index_id));
          } else {
            ObColumnSchemaV2* tmp_col = data_schema.get_column_schema(index_id);
            if (is_invalid || nullptr != tmp_col) {
              ret = OB_ERR_INVALID_COLUMN_ID;
              LOG_WARN("invalid id", K(ret));
            } else {
              tmp_gen_col.set_column_id(index_id);
            }
          }
        }
        if (OB_FAIL(ret)) {
          // do nothing
        } else if (OB_FAIL(databuff_printf(col_name_buf,
                       OB_MAX_COLUMN_NAMES_LENGTH,
                       pos,
                       "SYS_NC%ld$", /*naming rules are compatible with oracle*/
                       tmp_gen_col.get_column_id()))) {
          LOG_WARN("print generate column prefix name failed", K(ret));
        } else if (OB_FAIL(tmp_gen_col.set_column_name(col_name_buf))) {
          LOG_WARN("set column name failed", K(ret));
        } else if (OB_FAIL(tmp_gen_col.set_orig_default_value(default_value))) {
          LOG_WARN("set orig default value failed", K(ret));
        } else if (OB_FAIL(tmp_gen_col.set_cur_default_value(default_value))) {
          LOG_WARN("set current default value failed", K(ret));
        } else if (OB_FAIL(data_schema.add_column(tmp_gen_col))) {
          LOG_WARN("add column schema to data table failed", K(ret));
        } else {
          gen_col = data_schema.get_column_schema(tmp_gen_col.get_column_id());
        }
      }
    }
  }
  return ret;
}

int ObIndexBuilderUtil::generate_prefix_column(
    const ObColumnSortItem& sort_item, ObTableSchema& data_schema, ObColumnSchemaV2*& prefix_col)
{
  int ret = OB_SUCCESS;
  ObColumnSchemaV2* old_column = NULL;
  ObColumnSchemaV2 prefix_column;
  char col_name_buf[OB_MAX_COLUMN_NAMES_LENGTH] = {'\0'};
  SMART_VAR(char[OB_MAX_DEFAULT_VALUE_LENGTH], expr_def)
  {
    MEMSET(expr_def, 0, sizeof(expr_def));
    int64_t name_pos = 0;
    int64_t def_pos = 0;
    const uint64_t spec_id = sort_item.get_column_id();
    const bool is_invalid = (spec_id < OB_APP_MIN_COLUMN_ID || spec_id > OB_MIN_SHADOW_COLUMN_ID);
    if (OB_ISNULL(old_column = data_schema.get_column_schema(sort_item.column_name_))) {
      ret = OB_ERR_KEY_COLUMN_DOES_NOT_EXITS;
      LOG_USER_ERROR(OB_ERR_KEY_COLUMN_DOES_NOT_EXITS, sort_item.column_name_.length(), sort_item.column_name_.ptr());
    } else if (old_column->is_generated_column()) {
      ret = OB_NOT_SUPPORTED;
      LOG_USER_ERROR(OB_NOT_SUPPORTED, "prefix index on generated column");
      LOG_WARN("prefix index on generated column not supported", K(ret), KPC(old_column));
    } else if (OB_FAIL(sql::ObDDLResolver::check_prefix_key(sort_item.prefix_len_, *old_column))) {
      LOG_WARN("Incorrect prefix key", K(sort_item), K(ret));
    } else if (OB_FAIL(databuff_printf(col_name_buf,
                   OB_MAX_COLUMN_NAMES_LENGTH,
                   name_pos,
                   "__substr%d_%ld",
                   sort_item.prefix_len_,
                   old_column->get_column_id()))) {
      LOG_WARN("print generate column prefix name failed", K(ret));
    } else if ((prefix_col = data_schema.get_column_schema(ObString(name_pos, col_name_buf))) != NULL) {
      // required column already exist for the index, so don't create it, just ref the column
      if (OB_INVALID_ID != spec_id) {
        // only in backup-restore mode, create index would set column id -->
        // ObCreateIndexResolver::resolve_index_column_node
        if (spec_id != prefix_col->get_column_id()) {
          ret = OB_ERR_INVALID_COLUMN_ID;
          LOG_USER_ERROR(OB_ERR_INVALID_COLUMN_ID, sort_item.column_name_.length(), sort_item.column_name_.ptr());
          LOG_WARN("Column id specified by create prefix index mismatch with column schema id",
              K(ret),
              K(spec_id),
              K(is_invalid),
              K(data_schema));
        }
      }
    } else if (OB_FAIL(databuff_printf(expr_def,
                   OB_MAX_DEFAULT_VALUE_LENGTH,
                   def_pos,
                   "SUBSTR(`%s`, 1, %d)",
                   old_column->get_column_name(),
                   sort_item.prefix_len_))) {
      LOG_WARN("print generate expr definition prefix failed", K(ret));
    } else {
      ObObj default_value;
      default_value.set_varchar(expr_def, static_cast<int32_t>(def_pos));
      prefix_column = *old_column;
      prefix_column.del_column_flag(HEAP_ALTER_ROWKEY_FLAG);  // clear flag
      if (!prefix_column.is_valid()) {
        ret = OB_ERR_UNEXPECTED;
        LOG_WARN("prefix column is invalid", K(ret));
      } else if (OB_FAIL(prefix_column.set_column_name(col_name_buf))) {
        LOG_WARN("set column name failed", K(ret));
      } else if (OB_FAIL(prefix_column.set_orig_default_value(default_value))) {
        LOG_WARN("set orig default value failed", K(ret));
      } else if (OB_FAIL(prefix_column.set_cur_default_value(default_value))) {
        LOG_WARN("set current default value to prefix column failed", K(ret));
      } else if (OB_FAIL(prefix_column.add_cascaded_column_id(old_column->get_column_id()))) {
        LOG_WARN("add cascaded column id failed", K(ret));
      } else {
        if (ob_is_text_tc(prefix_column.get_data_type())) {
          prefix_column.set_data_type(ObVarcharType);
          prefix_column.set_data_scale(0);
        }
        prefix_column.set_rowkey_position(0);
        prefix_column.set_index_position(0);
        prefix_column.set_tbl_part_key_pos(0);
        int32_t data_len = static_cast<int32_t>(min(sort_item.prefix_len_, old_column->get_data_length()));
        prefix_column.set_data_length(data_len);
        prefix_column.add_column_flag(VIRTUAL_GENERATED_COLUMN_FLAG);
        prefix_column.set_is_hidden(true);  // for debug
        old_column->add_column_flag(GENERATED_DEPS_CASCADE_FLAG);
        prefix_column.set_prev_column_id(UINT64_MAX);
        prefix_column.set_next_column_id(UINT64_MAX);
        if (OB_INVALID_ID != spec_id) {
          ObColumnSchemaV2* tmp_col = data_schema.get_column_schema(spec_id);
          if (is_invalid || NULL != tmp_col) {
            ret = OB_ERR_INVALID_COLUMN_ID;
            LOG_USER_ERROR(OB_ERR_INVALID_COLUMN_ID, sort_item.column_name_.length(), sort_item.column_name_.ptr());
            LOG_WARN("Column id specified by create prefix index mismatch with column schema id",
                K(ret),
                K(spec_id),
                K(is_invalid),
                K(data_schema));
          } else {
            prefix_column.set_column_id(spec_id);
          }
        } else {
          prefix_column.set_column_id(data_schema.get_max_used_column_id() + 1);
        }
        if (OB_FAIL(ret)) {
        } else if (OB_FAIL(data_schema.add_column(prefix_column))) {
          LOG_WARN("add column to data schema failed", K(ret));
        } else {
          prefix_col = data_schema.get_column_schema(prefix_column.get_column_id());
        }
      }
    }
  }
  return ret;
}
}  // namespace share
}  // end namespace oceanbase
